Assignment 4 - Digital signal processing and analysis¶

Course "Data processing and Visualization", IE500417, NTNU.

https://www.ntnu.edu/studies/courses/IE500417

Note (as usual): plagiarism is strictly forbidden! You should never copy any source code from other students. If you use any code written by others (except the standards libraries: NumPy, SciPy, Pandas, etc), provide a reference.

If the teachers see that your work is mostly copy+paste from online code snippets, the grade can be reduced.

If a case of plagiarism is detected, it will be reported to the administration.

Task description¶

In this assignment, you will practice digital signal processing (in a rather basic form, there will be no advanced DSP methods). You will work with two signals simultaneously. As it sometimes happens, the two signals are not synchronized: they are sampled at different time moments and with different sampling rate. You will have to resample and synchronize them so that both signals have the same sample timestamps. You will do some analysis of the signals and visualize them using line charts.

Submission details (as usual)¶

The assignment must be handed in on Blackboard. The following must be handed in:

  1. Report in PDF or HTML format describing the results of this assignment. Preferably, it is generated from the Jupyter notebook you used (Hint: In Jupyter: File > Download as > HTML). Alternatively (if you use plain Python or other tools), prepare a readable report that contains figures and source code snippets necessary to understand your work.
  2. Source code that you used to generate the results. This could be the the Jupyter notebook file, python source files, Matlab files, etc.

Deadlines and grading information on Blackboard.

In [1]:
import pandas as pd
import numpy as np
from pathlib import Path
import math
import warnings
import os
from datetime import datetime, timedelta
import plotly.express as px
import plotly.graph_objects as go
from scipy.fft import fft, fftfreq
from scipy.signal import resample
from scipy.interpolate import interp1d

def warn(*args, **kwargs):
    pass
import warnings
warnings.warn = warn

DATA_DIR = Path("../data")
In [2]:
#unity functions

# Plots freq domain of graph
def plot_freq(amp, freq, threshold):
    threshold_line = np.copy(freq)
    threshold_line.fill(threshold)
    
    fig = go.Figure([
        go.Scatter(x=freq, y=amp, line=go.scatter.Line(dash="solid", width=1, color="pink"), name="Signal"),
        go.Scatter(x=freq, y=threshold_line, line=go.scatter.Line(dash="dot", width=1, color="green"), name="Threshold"),
    ])
    fig.layout.template = "plotly_dark"
    fig.update_layout(
        title = dict(
            font = dict(
                size = 20
            ),
            text = "Frequency Domain"
        ),
        height = 600,
        yaxis_title="Amplitude",
        xaxis_title="Frequency"
    )
    fig.update_xaxes(
        tickangle = -45,
    )
    fig.update_layout(title_x=0.5)
    fig.update_xaxes(range=[0, 20])
    fig.update_yaxes(range=[0, 0.5])
    fig.show()
    
# Plots original and resampled data
def plot_signals(signal, org_signal):
    fig = go.Figure([
        go.Scatter(x=org_signal['time'], y=org_signal['value'], line=go.scatter.Line(dash="solid", width=1, color="pink"), name="Original"),
        go.Scatter(x=signal['time'], y=signal['value'], line=go.scatter.Line(dash="dot", width=2, color="green"), name="Resampled")
    ])
    fig.layout.template = "plotly_dark"
    fig.update_layout(
        title = dict(
            font = dict(
                size = 20
            ),
            text = "Signal Values",    
        ),
        height = 600,
        yaxis_title="Value",
        xaxis_title="Sample time"
    )
    fig.update_layout(title_x=0.5)
    fig.update_xaxes(
        tickangle = -45,
    )
    fig.show()

Part 1: Understanding the signals (25%)¶

Step 1.1: Load the two signals from CSV files: s1.csv and s2.csv.

In [3]:
# Your code here
sig_1 = pd.read_csv(DATA_DIR / "s1.csv")
sig_2 = pd.read_csv(DATA_DIR / "s2.csv")

Step 1.2: Do a quick analysis of what data you got: column names of each signal and number of rows.

In [4]:
# Your code here
sig_1 = sig_1.rename(columns={"s1": "value"}) #renaming it to value for consistency
sig_1.head()
Out[4]:
time value
0 2017-08-29 10:30:00.000 15.12
1 2017-08-29 10:30:00.010 15.01
2 2017-08-29 10:30:00.020 14.51
3 2017-08-29 10:30:00.030 14.94
4 2017-08-29 10:30:00.040 14.96
In [5]:
sig_1.info()
<class 'pandas.core.frame.DataFrame'>
RangeIndex: 10000 entries, 0 to 9999
Data columns (total 2 columns):
 #   Column  Non-Null Count  Dtype  
---  ------  --------------  -----  
 0   time    10000 non-null  object 
 1   value   10000 non-null  float64
dtypes: float64(1), object(1)
memory usage: 156.4+ KB
In [6]:
sig_1.describe()
Out[6]:
value
count 10000.000000
mean 82.165242
std 38.120285
min 13.420000
25% 49.183750
50% 76.800000
75% 106.825000
max 201.240000
In [7]:
sig_2 = sig_2.rename(columns={"s2": "value"}) #renaming for consistency
sig_2.head()
Out[7]:
time value
0 2017-08-29 10:30:00.000 241.17
1 2017-08-29 10:30:00.190 238.04
2 2017-08-29 10:30:00.400 239.95
3 2017-08-29 10:30:00.520 242.19
4 2017-08-29 10:30:00.700 244.66
In [8]:
sig_2.info()
<class 'pandas.core.frame.DataFrame'>
RangeIndex: 810 entries, 0 to 809
Data columns (total 2 columns):
 #   Column  Non-Null Count  Dtype  
---  ------  --------------  -----  
 0   time    810 non-null    object 
 1   value   810 non-null    float64
dtypes: float64(1), object(1)
memory usage: 12.8+ KB
In [9]:
sig_2.describe()
Out[9]:
value
count 810.000000
mean 254.646198
std 32.345325
min 182.840000
25% 227.762500
50% 256.930000
75% 282.846250
max 312.000000

Step 1.3: One of the signals is sampled at even frequency, another is not. Find out which is the nonuniformly sampled signal. Store it in variable signal_x. Store the the uniformly sampled signal in variable signal_u.

Note: "find out" here means "write code that finds out". If you will manually assign the signal_u and signal_x variables, you won't get points for this step. The reason - manual assignment is not flexible. If the dataset changes, your remaining notebook calculations will be wrong suddenly. Flexible code that finds the necessary signals would work even if we would swap the s1.csv and s2.csv files.

In [10]:
# Your code here
def to_datetime(datatime_str):
    return datetime.strptime(datatime_str, '%Y-%m-%d %H:%M:%S.%f')

def is_sampling_freq_even(sig):
    num_of_samples = sig["time"].shape[0]
    assert num_of_samples >= 3, "Need at least 3 points to check"
    time_df = sig["time"].copy().apply(to_datetime)
    time_delta = time_df.diff(1)[1:]
    return (time_delta[1] == time_delta).all()

signal_x = sig_2 if is_sampling_freq_even(sig_1) else sig_1
signal_u = sig_2 if signal_x is not sig_2 else sig_1

# Based on the above question
print(f"{'Signal 1' if signal_u is sig_1 else 'Signal 2'} is a unifrom signal")
print(f"{'Signal 1' if signal_x is sig_1 else 'Signal 2'} is a non-unifrom signal")
Signal 1 is a unifrom signal
Signal 2 is a non-unifrom signal

Step 1.4. Plot the two signals in a line chart:

  • Both lines in a single chart
  • Add a legend with label for each signal
  • Signal U should be Green dashed line with line width=2
  • Signal X should be Blue solid line with line width=1.
  • Chart should have a title, font size = 20
In [11]:
# Your code here
signal_X_converted = signal_x.copy()
signal_X_converted["time"] = signal_X_converted["time"].apply(to_datetime)

signal_U_converted = signal_u.copy()
signal_U_converted["time"] = signal_U_converted["time"].apply(to_datetime)

fig = go.Figure([
    go.Scatter(x=signal_U_converted['time'], y=signal_U_converted['value'], line=go.scatter.Line(dash="dash", width=2, color="green"), name="Signal U"),
    go.Scatter(x=signal_X_converted['time'], y=signal_X_converted['value'], line=go.scatter.Line(dash="solid", width=1, color="blue"), name="Signal X")
])
fig.layout.template = "plotly_dark"
fig.update_layout(
    title = dict(
        font = dict(
            size = 20
        ),
        text = "Signal Values"
    ),
    height = 600,
    yaxis_title="Value",
    xaxis_title="Sample time"
)
fig.update_layout(title_x=0.5)
fig.update_xaxes(
    tickangle = -45,
)
fig.show()

A reference showing approximately how it could look:

Step 1.5: Find out the sampling frequency of Signal U, save it in variable f_u.

In [12]:
# Your code here
time_per_sample = (signal_U_converted['time'][1] - signal_U_converted['time'][0]).total_seconds()

f_u = 1/ time_per_sample

print(f"The sampling frequency of Signal U: {f_u} Hz")
The sampling frequency of Signal U: 100.0 Hz

Step 1.6: Find out which are the highest frequencies used in Signal U. Save the highest frequency in variable b_u, having Hz as units.

Hint: use Fourier transform, and find max frequency having a significant amplitude. There may be many frequencies with tiny amplitudes. Use some threshold to filter these out.

In [13]:
# Your code here
num_samples = signal_U_converted['value'].shape[0]
np_array = signal_U_converted['value'].to_numpy()

# Amplitude threshold
AMPLITUDE_THRESHOLD = 0.001

# Amplitude (ignoring the complex numbers)
yf = np.abs(fft(np_array, norm="forward").real)

# Frequency
xf = fftfreq(num_samples, time_per_sample)
max_f = float('-inf')

for idx in range(0, len(xf)):
    if(yf[idx] < AMPLITUDE_THRESHOLD):
        continue
    else:
        max_f = max( max_f, xf[idx] )

b_u = max_f

print(f"The highest constituent frequncy in Signal U: {b_u} Hz")
plot_freq(yf, xf, threshold=AMPLITUDE_THRESHOLD)
The highest constituent frequncy in Signal U: 49.980000000000004 Hz

Step 1.7: Find out the minimum frequency at which Signal U should have been sampled to still contain all the information in the signal. Save it in variable fs_u.

Hint: Nyquist-Shannon theorem

In [14]:
# Your code here

# The theory says the sampling freq should be more than twice the highest freq in the signal
fs_u = math.ceil(2 * b_u) # rounding it up since the freq needs to be equal or higher

print(f"The optimal sampling rate required for Signal U: {fs_u} Hz")
The optimal sampling rate required for Signal U: 100 Hz

Step 1.8: Calculate, how many % of space is wasted by storing too many samples for Signal U. I.e., if we would resample in the Signal U at a sampling rate fs_u, how many samples would we store, and how much that is in relation to the number of samples in the CSV file?

If it is 0, why?

P.S. Don't worry about Signal X – the sampling system for it was designed by careless engineers who did not know about Nyquist-Shannon's sampling theorem. Therefore, the sampling of Signal X is not proper. But we work with what we have.

In [15]:
# Your code here
num_samples = signal_U_converted['value'].shape[0]
time_duration_in_seconds = (signal_U_converted['time'].max() - signal_U_converted['time'].min()).total_seconds()
optimal_num_samples = round(time_duration_in_seconds * fs_u) # rounding it here since partial samples make no sense

print(f"Space wasted: {round((num_samples - optimal_num_samples) * 100 / num_samples)}%")
Space wasted: 0%

--- YOUR ANSWER HERE ---
Since our amplitude threshold dictates the highest consituent frequency and thus the optimal frequncy according to Nyquist-Shannon theorem. A 0.001 threshold yeilds an optimal sampling frequcny of 100Hz. This multiplied with the time duration of the signal gives us 10,000 points which is equal to the original number of points in the signal. Hence we don't have any waste of space in the original sampling.

Part 2: Synchronizing the signals – resampling (25%)¶

Note: whenever you modify something for the signals, it is suggested to store the modifierd signal in another variable. Keep the original one intact. You may later want to compare the two.

Step 2.1: Decimate (down-sample) the Signal U to 10Hz, store it in variable su_resampled:

In [16]:
# Your code here
num_samples_in_10hz = round(time_duration_in_seconds * 10)

su_resampled_intermediate = resample(signal_U_converted['value'], num_samples_in_10hz, signal_U_converted['time'])

su_resampled = pd.DataFrame(data={"value": su_resampled_intermediate[0], "time": su_resampled_intermediate[1]})

# Plotting resampled signal
plot_signals(su_resampled, signal_U_converted)

Step 2.2: Explain - why is the resampled signal not containing the same information as the original signal (i.e., what information is lost?)

--- YOUR ANSWER HERE ---
The resampled signal loses information due to the fact that we are using 10Hz to resample it which is much lower than the optimal frequency of 100Hz as computed before. The information we lose are the high frequncy contituent signals due to the fact that our lower sampling rate now will not be able to capture them.

From now on, in all the places whare you need to do something with Signal U, use the resampled version: su_resampled.

Step 2.3: Synchronize Signal X with Signal U, store it in variable sx_resampled. I.e., if su_resampled is sampled at time moments t0, t1, …, tN, then resample Signal X at the same time moments: t0, t1, …, tN. This may involve several steps, depending on what functions/libraries you use.

Hint: see 07-2-Resampling.ipynb example notebook on Blackboard ().

Hint: your resulting sx_resampled should be 10Hz signal, not 100Hz.

In [17]:
# Your code here
# I am upsampling the signal to get better output during interpolation with respect to the reference signal, ie. Signal U
upsampled_X = (signal_X_converted.copy().set_index("time").resample('5ms').asfreq()).interpolate(method='linear').reset_index()

interpolation_fn = interp1d(upsampled_X["time"].apply(datetime.timestamp), upsampled_X["value"])
new_values = interpolation_fn(su_resampled["time"].apply(datetime.timestamp))

sx_resampled = pd.DataFrame({"value": new_values, "time": su_resampled["time"]})

# Plotting the graph
plot_signals(sx_resampled, signal_X_converted)

Step 2.4: Check if the two signals really are synchronized – compare the timestamps, these should be equal.

In [18]:
# Your code here
assert sx_resampled["time"].equals(su_resampled["time"])

Step 2.5: Take both signals and insert them into a single DataFrame object (name it composed_data) which has:

  • Timestamps as the index column
  • Two columns named signalX and signalU containing the corresponding values (sx_resampled and su_resampled).
In [19]:
# Your code here
composed_data = pd.DataFrame({"time": sx_resampled["time"], "signalX": sx_resampled["value"], "signalU": su_resampled["value"] }).set_index("time")

composed_data
Out[19]:
signalX signalU
time
2017-08-29 10:30:00.000 241.170000 84.916675
2017-08-29 10:30:00.100 239.522632 0.184510
2017-08-29 10:30:00.200 238.130952 22.706012
2017-08-29 10:30:00.300 239.040476 12.151147
2017-08-29 10:30:00.400 239.950000 19.746830
... ... ...
2017-08-29 10:31:39.500 257.840000 152.423106
2017-08-29 10:31:39.600 257.491667 149.991113
2017-08-29 10:31:39.700 258.273529 161.480336
2017-08-29 10:31:39.800 260.720588 141.190984
2017-08-29 10:31:39.900 262.680769 167.268946

1000 rows × 2 columns

Part 3: Find extreme values (20%)¶

In this part you will find extreme values in the signals. Typically, these could mean outliers, sampling errors or extreme modes of operation in the system (such as overheating of a motor).

Step 3.1: Find Signal U values (su_resampled) above 170.0. Store those in variable extreme_u_vals.

In [20]:
# Your code here
extreme_u_vals = su_resampled["value"][su_resampled["value"] > 170].to_numpy()

print(f"There are {extreme_u_vals.shape[0]} outliers")
There are 18 outliers

Step 3.2: Find Signal X values (sx_resampled) outside the range mean ± 2* StdDev. Store those in variable ex_static_x_vals.

In [21]:
# Your code here
mean = sx_resampled["value"].mean()
stdDev = sx_resampled["value"].std()

ex_static_x_vals = sx_resampled["value"][ (sx_resampled["value"] > (mean + 2 * stdDev)) | (sx_resampled["value"] < (mean - 2 * stdDev)) ].to_numpy()

print(f"There are {ex_static_x_vals.shape[0]} outliers")
There are 7 outliers

Step 3.3: Find Signal X values (sx_resampled) outside an adaptive range: 3-second moving average ± 2 * StdDev. Both the average and StdDev are calculated in a 3-second rolling window. Store the values in variable ex_dynamic_x_vals.

In [22]:
# Your code here
def find_init_window_idxs(signal, window_time):
    window_start_idx = 0
    window_stop_idx = sx_resampled["time"].shape[0]

    time_stop = sx_resampled["time"][0] + timedelta(seconds=window_time)

    for idx in range(window_start_idx, window_stop_idx):
        if sx_resampled["time"][idx] >= time_stop:
            window_stop_idx = idx
            break
    
    return (window_start_idx, window_stop_idx)

# returns a tuple where 1st item is mean and 2nd is std
def compute_stats(data_window):
    return (data_window.mean(), data_window.std())

def rolling_stat_generator(signal, TIME_WINDOW_in_S):
    (window_start_idx, window_stop_idx) = find_init_window_idxs(signal, TIME_WINDOW_in_S)
    
    signal = signal.copy()
    signal["rolling_mean"] = None
    signal["rolling_std"] = None
    signal["is_outlier"] = False
    
    while window_stop_idx < signal["time"].shape[0]:
        # we cannot check for signals before running average starts
        (mean, std) = compute_stats(signal["value"][window_start_idx:window_stop_idx + 1])

        signal["rolling_mean"][window_stop_idx] = mean
        signal["rolling_std"][window_stop_idx] = std

        if signal["value"][window_stop_idx] > mean + 2 * std or signal["value"][window_stop_idx] < mean - 2 * std:
            signal["is_outlier"][window_stop_idx] = True

        window_start_idx += 1
        window_stop_idx += 1
        
    return signal


TIME_WINDOW_in_S = 3
ex_dynamic_x = rolling_stat_generator(sx_resampled, TIME_WINDOW_in_S)
ex_dynamic_x_vals = ex_dynamic_x[ex_dynamic_x["is_outlier"]]["value"].to_numpy()

print(f"There are {ex_dynamic_x_vals.shape[0]} outliers")
There are 155 outliers

Part 4: Extra challenges (30%)¶

Step 4.1: Plot a line chart with values, rolling mean, normal boundaries (mean +/- 2StdDev) and the extreme values ex_dynamic_x_vals that you calculated in Part 3. Example:

In [23]:
# Your code here
def plot_outliers(signal):
    signal = signal.copy()
    signal["M + 2S"] = signal["rolling_mean"] + 2 * signal["rolling_std"]
    signal["M - 2S"] = signal["rolling_mean"] - 2 * signal["rolling_std"]
    outliers = signal[signal["is_outlier"]]


    fig = go.Figure([
        go.Scatter(x=signal['time'], y=signal['value'], line=go.scatter.Line(dash="solid", width=1, color="blue"), name="Signal"),
        go.Scatter(x=signal['time'], y=signal['rolling_mean'], line=go.scatter.Line(dash="solid", width=1, color="yellow"), name="Rolling Mean"),
        go.Scatter(x=signal['time'], y=signal['M + 2S'], line=go.scatter.Line(dash="dot", width=1, color="green"), name="Rolling Mean + 2(STD)"),
        go.Scatter(x=signal['time'], y=signal['M - 2S'], line=go.scatter.Line(dash="dot", width=1, color="green"), name="Rolling Mean - 2(STD)"),
        go.Scatter(x=outliers['time'], y=outliers['value'], line=go.scatter.Line(dash="dot", width=1, color="red"), mode="markers", name="Outliers")
    ])
    fig.layout.template = "plotly_dark"
    fig.update_layout(
        title = dict(
            font = dict(
                size = 20
            ),
            text = "Outliers"
        ),
        height = 600,
        yaxis_title="Value",
        xaxis_title="Sample time"
    )
    fig.update_xaxes(
        tickangle = -45,
    )
    fig.update_layout(title_x=0.5)
    fig.show()

plot_outliers(ex_dynamic_x)

Reference Image

Step 4.2: Find segments in Signal X (sx_resampled) where the 10-second moving average value is increasing for a continuous period of at least two seconds.

In [24]:
# Your code here
def check_positive_monotone_over_window(signal, time_in_s):
    (window_start_idx, window_stop_idx) = find_init_window_idxs(signal, time_in_s)
    
    signal = signal.copy()

    n_key = f"is_increasing_{time_in_s}"
    signal[n_key] = False
    
    while window_stop_idx < sx_resampled["time"].shape[0]:
        # If we don't have a mean to compute on, then we continue to next window
        if(signal["rolling_mean"][window_start_idx] is None):
            window_start_idx += 1
            window_stop_idx += 1
            continue

        # check if mean diff is all positive within this window
        if signal["is_increasing"][window_start_idx:window_stop_idx + 1].all():
            signal[n_key][window_start_idx:window_stop_idx + 1] = True

        window_start_idx += 1
        window_stop_idx += 1
    return (signal, n_key)

TIME_WINDOW_in_S = 10
ex_dynamic_x_10 = rolling_stat_generator(sx_resampled, TIME_WINDOW_in_S)

# Find arr[x] = arr[x] - arr[x-1] (If the current value at idx has increased compared to idx-1)
ex_dynamic_x_10["is_increasing"] = ex_dynamic_x_10["rolling_mean"].fillna(0).diff() > 0

TIME_WINDOW_in_S_mono = 2
(ex_dynamic_x_10_2, n_key) = check_positive_monotone_over_window(ex_dynamic_x_10, TIME_WINDOW_in_S_mono)

ex_dynamic_x_10_2_vals = ex_dynamic_x_10_2[ex_dynamic_x_10_2[n_key]]["value"].to_numpy()

Step 4.3: Plot a line chart and mark these regions (from previous step) in a different color.

For example: show the normal line in blue color and the continuously increasing moving average segments in green. Example:

In [25]:
# Your code here
def signal_splitter(signal, n_key):
    result = []
    init_idx = 0
    final_idx = 0
    is_tracking = False
    
    while final_idx < signal.shape[0]:
        if signal[n_key][init_idx] == True:
            is_tracking = True
        else:
            init_idx += 1
            final_idx += 1
        
        if is_tracking == True:
            if (final_idx + 1) == signal.shape[0] or signal[n_key][final_idx + 1] == False:
                result.append(signal[init_idx: final_idx + 1])
                is_tracking = False
                final_idx += 1
                init_idx = final_idx
            else:
                final_idx += 1
            
    return result

def plot_positive_monotone(signal, n_key):
    signal = signal.copy()
    increasing_segments = signal[signal[n_key]]
    
    trace_array = [go.Scatter(x=signal['time'], y=signal['value'], line=go.scatter.Line(dash="solid", width=1, color="blue"), name="Signal"),]
    
    split_sig_arr = signal_splitter(signal, n_key)
    
    for data in split_sig_arr:
        trace_array.append(
            go.Scatter(
                mode="lines+markers",
                x=data['time'], 
                y=data['rolling_mean'], 
                line=go.scatter.Line(dash="solid", width=1, color="green"),
                showlegend=False
            ))

    fig = go.Figure(trace_array)
    fig.layout.template = "plotly_dark"
    fig.update_layout(
        title = dict(
            font = dict(
                size = 20
            ),
            text = "2s Monotonic Increasing Segments"
        ),
        height = 600,
        yaxis_title="Value",
        xaxis_title="Sample time"
    )
    fig.update_layout(title_x=0.5)
    fig.update_xaxes(
        tickangle = -45,
    )
    fig.show()

plot_positive_monotone(ex_dynamic_x_10_2, n_key)

Reference Image

Reflection¶

Please reflect on the following questions:

  1. How did the assignment go? Was it easy or hard?

A. Yeah it was fine. Not too tough, nor too easy. 2. How many hours did you spend on it?
A. 4hr 3. What was the most time-consuming part?
A. Part 2, 3, 4 4. If you need to do similar things later in your professional life, how can you improve? How can you do it more efficiently?
A. Having a list of helper functions as a toolkit might be useful 5. Was there something you would expect to learn that this exercise did not include?
A. No, it had everything I was hoping to learn
6. Was there something that does not make sense?
A. The sampling frequency in 1.6